package com.hkbigdata.ontimer;

import com.hkbigdata.bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author liuanbo
 * @creat 2024-05-14-19:08
 * @see 2194550857@qq.com
 */
public class Flink01_ProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = env.socketTextStream("localhost", 9999)
                .flatMap(new FlatMapFunction<String, WaterSensor>() {
                    @Override
                    public void flatMap(String value, Collector<WaterSensor> out) throws Exception {
                        String[] split = value.split(",");
                        WaterSensor waterSensor = new WaterSensor(
                                split[0],
                                Long.valueOf(split[1]),
                                Integer.valueOf(split[2])
                        );
                        out.collect(waterSensor);
                    }
                });


        waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        //注册一个处理时间定时器
                        Long timestamp = ctx.timerService().currentProcessingTime();
                        String format1 = format.format(timestamp);
                        System.out.println(format1);
                        System.out.println("当前时间戳:" + timestamp);
                        ctx.timerService().registerProcessingTimeTimer(timestamp + 5000L);
                        out.collect(value.toString());
                    }

                    /**
                     * onTimer 只有定时器响的才执行
                     * @param timestamp 定时器响的时候的时间
                     * @param ctx 定时器上下文对象
                     * @param out 输出
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        System.out.println("定时器时间:" + timestamp);
                        System.out.println("定时器触发");
                    }
                }).print();

        env.execute();

    }
}
